Glue StudioでSnowflake connectivityを使ってSnowflakeとS3のデータ連携をしてみた

Glue StudioでSnowflake connectivityを使ってSnowflakeとS3のデータ連携をしてみた

Glue StudioでSnowflakeのソース・ターゲットを使ったGlueジョブを作成し、Glue接続に保存した接続情報を使ってデータ連携をすることができました。SnowflakeとAWS間でデータのやりとりをしたいユースケースにおすすめです。
Clock Icon2023.07.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部 機械学習チームの鈴木です。

Amazon S3とSnowflake間のデータ連携の方法を改めて探していたのですが、ちょうどSnowflake connectivityの一般提供を開始したアナウンスがあったので試してみました。AWS Glue for Apache SparkのSnowflakeへのネイティブな接続の仕組みになります。

この記事について

概要

まず、以下の順番を踏みつつ、Glue Sudioから利用できるSnowflakeのターゲットを使い、S3にあるデータをSnowflakeにアップロードするGlueジョブの作成と動作確認をしてみました。

  1. Snowflake側でデータベースおよびユーザーの準備をする。
  2. AWS Secrets ManagerとGlue接続で接続情報を登録する。Glueジョブで使うIAMロールを作成する。
  3. Glueジョブを作成する。
  4. GlueジョブでデータをSnowflakeにロードする。

以下の『Connecting to Snowflake in AWS Glue Studio (preview)』ガイドを参考に設定しました。

※ これは冒頭のアナウンスページで紹介されていたガイドですが、記事執筆時点ではpreviewの文言があったため、そのように記載しています。使用する際はご留意下さい。

その後、Snowflakeのソースを使い、S3にデータを取得する逆方向の連携のGlueジョブも作成してみました。

前提

Glueジョブでデータを読み書きするS3バケットはすでに作成されているものとします。

S3バケットからSnowflakeに送るデータについては、以下のブログで用意した、UCI Machine Learning RepositoryのIris Data Setを使用しました。

下記リンクにて公開されています。

https://archive.ics.uci.edu/ml/datasets/iris

データをダウンロードし、iris.dataをS3バケットにアップロードし、iris_quality_checkというGlueテーブルを作成しておきました。

iris.dataは元データの最後に空行が2行入っており、Glueジョブで処理する際に1行分空のレコードとして処理されてしまったため、空行は1行削除しておきました。

Snowflakeへのデータの連携

1. Snowflake側の準備

データベースの作成

Snowsightで、SYSADMINにロールを切り替えて、データベース画面からML_DATABASEという名前でデータベースを作成しました。

データベースの作成

新しいデータベース

テーブル作成

ML_DATABASE.PUBLIC.IRISテーブルを作成しました。

Snowsightで、SQLワークシートを開き、ワークシート左上のデータベース・スキーマを選択する箇所でML_DATABASE.PUBLICを選びました。

SYSADMINで以下のSQLを実行し、テーブルを作成しました。

CREATE OR REPLACE TABLE IRIS
(
  sepal_length FLOAT,
  sepal_width FLOAT,
  petal_length FLOAT,
  petal_width FLOAT,
  class STRING
);

ユーザーへのロールの作成

ML_DATABASE.PUBLIC.IRISテーブルに対する一連の権限を持ったGLUEJOB_OPERATORユーザーを作成しました。

Snowsightで、SQLワークシートを開き、ワークシート左上のデータベース・スキーマを選択する箇所でML_DATABASE.PUBLICを選びました。

ACCOUNTADMINで以下のSQLを実行し、ロールとユーザーを作成しました。

-- ROLEの作成
CREATE ROLE GLUEJOB_ROLE;

-- ウェアハウスへのUSAGEアクセス権の付与
GRANT USAGE ON WAREHOUSE 使用するウェアハウス TO ROLE GLUEJOB_ROLE;

-- データベースへのUSAGEアクセス権の付与
GRANT USAGE ON DATABASE ML_DATABASE TO ROLE GLUEJOB_ROLE;

-- スキーマへのアクセス権の付与
GRANT USAGE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE GLUEJOB_ROLE;
GRANT CREATE TABLE ON SCHEMA ML_DATABASE.PUBLIC TO ROLE GLUEJOB_ROLE;

-- テーブルへの権限の付与
GRANT SELECT, INSERT, UPDATE, DELETE, TRUNCATE ON ALL TABLES IN SCHEMA PUBLIC TO ROLE GLUEJOB_ROLE;

-- ユーザーの作成
CREATE USER GLUEJOB_OPERATOR
    PASSWORD = 'パスワード'
    DEFAULT_ROLE = 'GLUEJOB_ROLE'
    DEFAULT_WAREHOUSE = '使用するウェアハウス'
    MUST_CHANGE_PASSWORD = FALSE;

-- ユーザーへのロールの付与
GRANT ROLE GLUEJOB_ROLE TO USER GLUEJOB_OPERATOR;

ポイントとしては以下です。

  • DEFAULT_WAREHOUSEはロードする際に参照するので設定する。
  • GlueジョブからCREATE TABLEなど発行するので、スキーマに対してその操作を行えるように権限をつける必要がある。Glue Stduioで生成されたPySparkのスクリプトに記載のSQL文をみて判断する。

処理実行後、意図通りに権限がついているか、SHOW GRANTSコマンドで確認するとよいかもしれません。

SHOW GRANTS TO ROLE GLUEJOB_ROLE;

2. Glueジョブ周辺リソースの準備

シークレットの作成

Snowflakeでユーザーを作成したので、Glueジョブから使えるようにシークレットを作成しました。

AWS Secrets Managerより、新しいシークレットを保存するを押して作成しました。

ポイントとして、ステップ1 シークレットのタイプを選択で、以下のように設定しました。

シークレットのタイプを選択

シークレットのタイプはその他のシークレットのタイプを選択しました。これはほかのタイプで該当するものがなさそうだったので消去法的に選びました。

sfUserにSnowflakeで作成したユーザー名、sfPasswordにそのパスワードを入れました。このユーザー名・パスワードのキーは、先に紹介したガイドにて指定があったものになります。

ステップ2以降は、名前だけは後から分かるものに設定して、残りはデフォルトのままとしました。

Glue接続の作成

Glueのコンソールより、以下のようにGlue接続を作成しました。

Glue接続の作成

  • コネクションタイプはSnowflakeにしました。
  • Snowflake URLは「https://アカウント識別子.snowflakecomputing.com」としました。アカウント識別子orgname-account_nameという形式にしており、作り方は『アカウント識別子 | Snowflake Documentation』のガイドをご確認ください。
  • AWS Secretは先に作成したものを使用しました。
  • Snowflake roleはoptionalですが、念の為設定しておきました。

Glueジョブ用のIAMロールの作成

以下のCloudFormationテンプレートを使い、Glueジョブで使うIAMロールを作成しました。

Parameters:
  DataBucketName:
    Description: Backet Name for Sample Data.
    Type: String
  SecretARN:
    Description: ARN of secret for snowflake user.
    Type: String

Resources:
  GlueStudioExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: AWSGlueServiceRole-Studio-CM-nayuts
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: glue.amazonaws.com
            Action: "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: GlueStudioSecretManagerAccessPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action: [
                  "secretsmanager:GetSecretValue"
                ]
                Resource: [
                  !Ref SecretARN
                ]
        - PolicyName: GlueStudioS3AccessPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action: [
                  "s3:*"
                ]
                Resource: [
                  !Sub "arn:aws:s3:::${DataBucketName}",
                  !Sub "arn:aws:s3:::${DataBucketName}/*"
                ]
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole

DataBucketNameで指定するデータがあるバケットへの権限については、Snowflakeに書き込む際と読み込む際の両方を考えて広めにつけました。運用の際にもう少し狭めたい場合は適宜修正してください。

3. Glueジョブの作成

Glue StudioにてGlueジョブを作成しました。

まず、以下のようにSnowflakeへのロード用のジョブを作成しました。

作成したロード用のジョブ

S3からデータを取得するノードは以下のようにしました。Glueテーブルからデータを読み出す形としました。

S3からデータを取得するノードの設定

Snowflakeへデータをロードするノードは以下のようにしました。

Snowflakeへデータをロードするノードの設定

Handling of data and target tableのオプションは充実しており、今回はTRUNCATE target tableにしました。実行のたびにテーブルを空にしてデータを入れてくれるので、洗い替え処理の実装にも使えますね。

Job detailsタブでIAM Roleとワーカー数の設定をしました。IAM Roleは先に作成したものを指定しました。ワーカー数は小さいデータなので最小にしました。これは実際のデータ量により調整する必要があります。

ジョブの詳細

Glue versionの箇所では、Snowflake connectionsに対応するGlueバージョンついての補足がでていますね。

これでGlueジョブの準備は終わりです。

ちなみに、生成されたPySparkのスクリプトは以下のようになっていました。データベース名は置き換えてあります。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket extract
S3bucketextract_node1 = glueContext.create_dynamic_frame.from_catalog(
    database="データベース名",
    table_name="iris_quality_check",
    transformation_ctx="S3bucketextract_node1",
)

# Script generated for node Snowflake load
Snowflakeload_node1690612413266 = glueContext.write_dynamic_frame.from_options(
    frame=S3bucketextract_node1,
    connection_type="snowflake",
    connection_options={
        "autopushdown": "on",
        "dbtable": "iris",
        "connectionName": "cm-nayuts-snowflake-connection",
        "preactions": "CREATE TABLE IF NOT EXISTS PUBLIC.iris (sepal_length VARCHAR, sepal_width VARCHAR, petal_length VARCHAR, petal_width VARCHAR, class VARCHAR); TRUNCATE TABLE PUBLIC.iris;",
        "sfDatabase": "ml_database",
        "sfSchema": "PUBLIC",
    },
    transformation_ctx="Snowflakeload_node1690612413266",
)

job.commit()

特にconnection_optionsに設定されているSQL文は、Snowflakeで発生したエラーが分かりやすくなるので確認すると良いと思います。

4. AWS Glueでのデータのロード

Glue StudioのRunボタンを押してジョブを実行しました。

jobの実行

RunsタブでRun statusSucceededになれば成功です。

実行結果

Snowsightからも確かにデータが入っていることを確認できました。

クエリの実行結果は、作成したユーザーのクエリ履歴から確認できます。

クエリの実行履歴

補足

Snowflakeへの接続でユーザーの権限が足りないなどの不備がおきがちですが、Snowsighのクエリ履歴から確認すると解決を進めやすかったです。

例えば、GlueジョブでAn error occurred while calling o98.pyWriteDynamicFrame. SQL access control error:というエラーで失敗してしまい、なにがダメだったんだろうとしばらく悩みました。

クエリ履歴を確認するとSQL access control error: Insufficient privileges to operate on schema 'PUBLIC'と出ていたため、スキーマに対する操作の権限が何か足りていなかったということが分かりました。

失敗の履歴例

なお、これはCREATE TABLEを実行する際に、Snowflakeのユーザーにスキーマへのテーブル作成の権限が付与されていなかったためでした。先に紹介したユーザーの作成のコマンドでは、これを許可するようにしてあります。

Snowflakeからのデータの取得

Snowflakeへのデータの連携が無事にできたので、接続設定などは流用しつつ、Snowflakeからのデータの取得も試してみました。

1. Glueジョブの作成

以下のようなジョブを作成しました。

取得用のジョブ

snowflakeからのデータ取得用ノードの設定です。

snowflakeからのデータ取得用ノードの設定

S3バケットへのデータ格納用ノードの設定です。結果確認が簡単なので、今回はCSV形式にしました。

S3バケットへのデータ格納用ノードの設定

生成されたPySparkのスクリプトは以下のようになっていました。格納先のバケット名は置き換えてあります。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node Snowflake extract
Snowflakeextract_node1690636314100 = glueContext.create_dynamic_frame.from_options(
    connection_type="snowflake",
    connection_options={
        "autopushdown": "on",
        "dbtable": "iris",
        "connectionName": "cm-nayuts-snowflake-connection",
        "sfDatabase": "ml_database",
        "sfSchema": "PUBLIC",
    },
    transformation_ctx="Snowflakeextract_node1690636314100",
)

# Script generated for node S3 bucket load
S3bucketload_node3 = glueContext.write_dynamic_frame.from_options(
    frame=Snowflakeextract_node1690636314100,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "s3://格納先のバケット名/from_snowflake/",
        "partitionKeys": [],
    },
    transformation_ctx="S3bucketload_node3",
)

job.commit()

2. Glueジョブの実行

Runボタンからジョブを実行し、成功することを確認しました。

S3バケットの出力先を確認すると、以下のように20個のファイルが出力されていました。

取得結果

試しに一つ開いてみると、データが正しく取得できていそうなことが確認できました。

SEPAL_LENGTH,SEPAL_WIDTH,PETAL_LENGTH,PETAL_WIDTH,CLASS
5.1,2.5,3.0,1.1,Iris-versicolor
6.8,3.0,5.5,2.1,Iris-virginica
4.4,2.9,1.4,0.2,Iris-setosa
6.0,3.4,4.5,1.6,Iris-versicolor
5.5,3.5,1.3,0.2,Iris-setosa
7.2,3.6,6.1,2.5,Iris-virginica
5.8,2.6,4.0,1.2,Iris-versicolor

ユースケースの考察

非常に簡単にAWSとSnowflakeの間のデータ連携ができることが確認できました。今回試したことを踏まえて、どのようなユースケースで使えそうか、例を考えてみました。

Snowflakeへのデータの連携

AWSにたまっているデータをSnowflakeへ連携する際に、Snowflakeへデータをロードできます。今回試したようにそのまま連携するのでも問題ありませんが、例えばGlueジョブにて個人情報などのマスクをした後で、Snowflakeに移すようなことが可能です。

PIIをマスクして連携

Glueジョブによる個人情報のマスクについては以下で取り上げました。

Glueジョブを挟むことで、GUI操作にて誰でも大規模なデータに対する加工を設定してSnowflakeに連携できます。機密性の高い情報はAWSに閉じて処理してしまい、Snowflake側でほかの社内SnowflakeアカウントとSecure Data Sharingでデータ共有するというような使い方にしても安心でよいですね。

Snowflakeからのデータの取得

例えば、AWS側の機械学習リソースを使いたいときが考えてみました。

SageMakerでのデータ利用

システム化する場合や、Snowflake側で機械学習用途に特化したデータマートを提供せずAWS側で追加の加工が必要な場合は、Glueジョブでデータを取得してPySparkで加工・結合などをした後、S3をインターフェースにSageMakerで利用するようなことができます。

アドホックに進めたい検証フェーズなど場合は、Snowpark Pythonなどでデータを直接Snowflakeから取得してもよいかもしれません。

最後に

AWS Glue for Apache Sparkで一般提供が開始された、Snowflakeへの接続を使って、S3との間で相互のデータ連携を試してみました。

特に認証情報や権限まわりでAWSとSnowflakeの双方で設定が必要ではありましたが、Glue Studioを使うことで非常に簡単に相互のデータ連携を実現することができました。

参考になりましたら幸いです。

そのほかに参考にした資料

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.